﻿using System;
using System.Collections.Concurrent;
using System.Net;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Forms;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Diagnostics;
using MQTTnet.Protocol;
using MQTTnet.Serializer;
using MQTTnet.Server;
using MqttClientConnectedEventArgs = MQTTnet.Client.MqttClientConnectedEventArgs;
using MqttClientDisconnectedEventArgs = MQTTnet.Client.MqttClientDisconnectedEventArgs;

namespace MQTTnetTest
{
    public partial class Form1 : Form
    {
        public Form1()
        {
            InitializeComponent();

            MqttNetGlobalLogger.LogMessagePublished += OnTraceMessagePublished;
        }
        private readonly ConcurrentQueue<MqttNetLogMessage> _traceMessages = new ConcurrentQueue<MqttNetLogMessage>();

        private IMqttClient _mqttClient;
        private IMqttServer _mqttServer;
        private void btnStart_Click(object sender, EventArgs e)
        {
            if (_mqttServer != null)
            {
                return;
            }

            JsonServerStorage storage;
            //   if (ServerPersistRetainedMessages.IsChecked == true)
            {
                storage = new JsonServerStorage();

                // if (ServerClearRetainedMessages.IsChecked == true)
                {
                    storage.Clear();
                }
            }

            _mqttServer = new MqttFactory().CreateMqttServer();

            var options = new MqttServerOptions();
            options.DefaultEndpointOptions.Port = int.Parse("1883");
            options.DefaultEndpointOptions.BoundIPAddress = IPAddress.Parse("127.0.0.1");
            options.Storage = storage;
            options.ConnectionValidator = c =>
            {
                if (c.ClientId.Length < 10)
                {

                    //return MqttConnectReturnCode.ConnectionRefusedIdentifierRejected;
                }

                if (c.Username != "666666" || c.Password != "666666")
                {
                    c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedUnacceptableProtocolVersion;

                }

            };
            _mqttServer.StartAsync(options);

            _mqttServer.ClientConnected += _mqttServer_ClientConnected;
            //   s.RunSynchronously();
            btnStart.Enabled = false;
            btnStop.Enabled = true;
        }
        private void btnStop_Click(object sender, EventArgs e)
        {
            if (_mqttServer == null)
            {
                return;
            }

            _mqttServer.StopAsync();
            _mqttServer = null;
            btnStart.Enabled = true;
            btnStop.Enabled = false;
        }


        private void _mqttServer_ClientConnected(object sender, MQTTnet.Server.MqttClientConnectedEventArgs e)
        {
            string d = "";
            //throw new NotImplementedException();
        }

        private async void OnTraceMessagePublished(object sender, MqttNetLogMessagePublishedEventArgs e)
        {
            _traceMessages.Enqueue(e.TraceMessage);
            await UpdateLogAsync();
        }

        private async Task UpdateLogAsync()
        {
            //while (_traceMessages.Count > 100)
            //{
            //    _traceMessages.TryDequeue(out _);
            //}

            var logText = new StringBuilder();
            foreach (var traceMessage in _traceMessages)
            {
                logText.AppendFormat(
                    "[{0:yyyy-MM-Dd HH:mm:ss.fff}] [{1}] [{2}] [{3}] [{4}]{5}",
                    traceMessage.Timestamp,
                    traceMessage.Level,
                    traceMessage.Source,
                    traceMessage.ThreadId,
                    traceMessage.Message,
                    Environment.NewLine);

                if (traceMessage.Exception != null)
                {
                    logText.AppendLine(traceMessage.Exception.ToString());
                }
            }
            await Task.Run(() =>
            {
                Invoke(new Action(() =>
                {

                    //主线程直接记日志 
                    txtInfo.Text += logText;
                    //    Trace.Text = logText.ToString();

                }));
            });

        }




        private void OnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs eventArgs)
        {
            var item = $"Timestamp: {DateTime.Now:O} | Topic: {eventArgs.ApplicationMessage.Topic} | Payload: {Encoding.UTF8.GetString(eventArgs.ApplicationMessage.Payload)} | QoS: {eventArgs.ApplicationMessage.QualityOfServiceLevel}";

            //await Dispatcher.RunAsync(CoreDispatcherPriority.Low, () =>
            //{
            //    if (AddReceivedMessagesToList.IsChecked == true)
            //    {
            //        ReceivedMessages.Items.Add(item);
            //    }
            //});
        }
        /// <summary>
        /// 客户端连接
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>

        private void btnConnect_Click(object sender, EventArgs e)
        {
            try
            {
                Connect();
            }
            catch (Exception)
            {

            }

        }
        private async void Connect()
        {
            var tlsOptions = new MqttClientTlsOptions
            {
                UseTls = false,//使用者
                IgnoreCertificateChainErrors = true,//忽略验证错误
                IgnoreCertificateRevocationErrors = true,//忽略逻辑错误
                AllowUntrustedCertificates = true//允许不可信证书
            };


            var options = new MqttClientOptions
            {
                ClientId = "12345678787812",
                ChannelOptions = new MqttClientTcpOptions
                {
                    Server = "127.0.0.1",
                    Port = int.Parse("1883"),
                    TlsOptions = tlsOptions
                },
                Credentials = new MqttClientCredentials
                {
                    Username = "666666",
                    Password = "666666"
                },
                CleanSession = false,
                KeepAlivePeriod = TimeSpan.FromSeconds(double.Parse("60")),
                ProtocolVersion = MqttProtocolVersion.V311
            };

            if (options.ChannelOptions == null)
            {
                throw new InvalidOperationException();
            }
            try
            {
                if (_mqttClient != null)
                {
                    await _mqttClient.DisconnectAsync();
                    _mqttClient.ApplicationMessageReceived -= OnApplicationMessageReceived;
                    _mqttClient.Connected -= OnConnected;
                    _mqttClient.Disconnected -= OnDisconnected;

                }

                var factory = new MqttFactory();
                _mqttClient = factory.CreateMqttClient();
                _mqttClient.ApplicationMessageReceived += OnApplicationMessageReceived;//收到消息
                _mqttClient.Connected += OnConnected;//连接
                _mqttClient.Disconnected += OnDisconnected;//断开

                await _mqttClient.ConnectAsync(options);


            }
            catch (Exception exception)
            {
                string s = exception.Message;
                txtInfo.Text += exception + Environment.NewLine;
                //  Trace.Text += exception + Environment.NewLine;
            }



        }

        private void OnConnected(object sender, MqttClientConnectedEventArgs e)
        {
            // _traceMessages.Enqueue(new MqttNetLogMessage("", DateTime.Now, -1,"", MqttNetLogLevel.Info, "! CONNECTED EVENT FIRED", null));

            //  Task.Run(UpdateLogAsync);
        }
        private void OnDisconnected(object sender, MqttClientDisconnectedEventArgs e)
        {
            //   _traceMessages.Enqueue(new MqttNetLogMessage("", DateTime.Now, -1,"", MqttNetLogLevel.Info, "! DISCONNECTED EVENT FIRED", null));

            //Task.Run(UpdateLogAsync);
        }

        private void btnClear_Click(object sender, EventArgs e)
        {
            txtInfo.Text = "";
        }

        private void btnSubscribe_Click(object sender, EventArgs e)
        {
            _mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic("my/topic").Build());

        }

        private void btnPublish_Click(object sender, EventArgs e)
        {
            var applicationMessage = new MqttApplicationMessageBuilder()
                   .WithTopic("my/topic")
                   .WithPayload("Hello World")
                   .WithAtLeastOnceQoS()
                   .Build();

            _mqttClient.PublishAsync(applicationMessage);
        }

        private void btnUnsubscribe_Click(object sender, EventArgs e)
        {
            _mqttClient.UnsubscribeAsync("my/topic");
        }
    }
}
